package com.shujia.sql

import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

object Demo2ClazzNum {
  def main(args: Array[String]): Unit = {
    val settings: EnvironmentSettings = EnvironmentSettings
      .newInstance()
      //.inStreamingMode()//流处理模式
      .inBatchMode() //批处理模式
      .build()

    val table: TableEnvironment = TableEnvironment.create(settings)


    //创建按source表
    table.executeSql(
      """
        |CREATE TABLE student (
        |        |  id STRING,
        |        |  name STRING,
        |        |  age INT,
        |        |  gender STRING,
        |        |  clazz STRING
        |        |)  WITH (
        |        |  'connector' = 'filesystem',           -- 必选：指定连接器类型
        |        |  'path' = 'data/students.json',        -- 必选：指定路径
        |        |  'format' = 'json'                     -- 必选：文件系统连接器指定 format
        |        |)
        |
      """.stripMargin)


    //创建sinkb表

    table.executeSql(
      """
        |CREATE TABLE clazz_num (
        |  clazz STRING,
        |  c BIGINT
        |)  WITH (
        |  'connector' = 'filesystem',           -- 必选：指定连接器类型
        |  'path' = 'data/clazz_num',        -- 必选：指定路径
        |  'format' = 'json'                     -- 必选：文件系统连接器指定 format
        |)
      """.stripMargin)



    //统计班级的人数
    table.executeSql(
      """
        |insert into clazz_num
        |select clazz,count(1) as c from student
        |group by clazz
        |
      """.stripMargin)

  }

}
